MINIFICPP-2760 Input/OutputStreamCallback should return std::expected#2148
MINIFICPP-2760 Input/OutputStreamCallback should return std::expected#2148martinzink wants to merge 7 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates MiNiFi C++ stream callback APIs to return an expected-style result (io::IoResult / io::ReadWriteResult) instead of raw int64_t / std::optional, and propagates that change through core session I/O, utilities (e.g., internal::pipe), processors, extensions, and tests.
Changes:
- Introduce
io::IoResultandio::ReadWriteResultand switchInputStreamCallback/OutputStreamCallback/InputOutputStreamCallbackto use them. - Update core session read/write/readWrite plumbing and various utilities/serializers to handle the new result types.
- Update processors, extensions, and unit tests to return
IoResult/ReadWriteResultconsistently.
Reviewed changes
Copilot reviewed 77 out of 77 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| minifi-api/common/include/minifi-cpp/io/StreamCallback.h | Adds IoResult/ReadWriteResult and changes stream callback typedefs to return them. |
| libminifi/test/unit/SiteToSiteTests.cpp | Updates session->write callback to return IoResult. |
| libminifi/test/unit/MetricsTests.cpp | Updates read/write/append callbacks to return IoResult. |
| libminifi/test/unit/FlowFileSerializationTests.cpp | Updates serializer reader callbacks to return IoResult. |
| libminifi/test/libtest/unit/ContentRepositoryDependentTests.h | Updates test callbacks to return IoResult and use error/cancel helpers. |
| libminifi/src/sitetosite/SiteToSiteClient.cpp | Updates session callbacks used for S2S send/receive to return IoResult. |
| libminifi/src/sitetosite/CompressionOutputStream.cpp | Adjusts use of internal::pipe after it returns IoResult. |
| libminifi/src/minifi-c.cpp | Bridges C callbacks into IoResult for ProcessSession read/write. |
| libminifi/src/core/ProcessSessionReadCallback.cpp | Updates file-copy read callback to return IoResult. |
| libminifi/src/core/ProcessSession.cpp | Updates ProcessSession callbacks handling to IoResult/ReadWriteResult. |
| libminifi/src/c2/C2Utils.cpp | Updates internal::pipe usage after return type change. |
| libminifi/include/core/ProcessSessionReadCallback.h | Updates callback signature to IoResult. |
| extensions/standard-processors/tests/unit/XMLRecordSetWriterTests.cpp | Updates callback return to IoResult. |
| extensions/standard-processors/processors/TailFile.cpp | Updates file reader callbacks to return IoResult. |
| extensions/standard-processors/processors/SplitText.cpp | Updates read/write callbacks and return type to IoResult. |
| extensions/standard-processors/processors/SplitRecord.cpp | Updates record-set read callback to return IoResult. |
| extensions/standard-processors/processors/SplitJson.cpp | Updates write callback to return IoResult. |
| extensions/standard-processors/processors/RouteText.cpp | Updates read callback to return IoResult. |
| extensions/standard-processors/processors/HashContent.cpp | Updates read callback to return IoResult. |
| extensions/standard-processors/processors/ExtractText.h | Updates read callback signature to IoResult. |
| extensions/standard-processors/processors/ExtractText.cpp | Updates read callback implementation to return IoResult. |
| extensions/standard-processors/processors/EvaluateJsonPath.cpp | Updates write callback to return IoResult. |
| extensions/standard-processors/processors/DefragmentText.cpp | Updates serializer reader and append callback to return IoResult. |
| extensions/standard-processors/processors/ConvertRecord.cpp | Updates record-set read callback to return IoResult. |
| extensions/standard-processors/controllers/XMLRecordSetWriter.cpp | Updates write callback to return IoResult. |
| extensions/standard-processors/controllers/JsonRecordSetWriter.cpp | Updates write callbacks to return IoResult. |
| extensions/sftp/processors/PutSFTP.cpp | Updates SFTP upload read callback to return IoResult. |
| extensions/sftp/processors/FetchSFTP.cpp | Updates SFTP download write callback to return IoResult. |
| extensions/rocksdb-repos/tests/SwapTests.cpp | Updates test write callback to return IoResult. |
| extensions/python/types/PyRecordSetReader.cpp | Updates read callback return type to IoResult. |
| extensions/python/types/PyProcessSession.cpp | Updates Python session read/write callbacks to return IoResult. |
| extensions/opencv/MotionDetector.cpp | Updates read/write callbacks to return IoResult. |
| extensions/opencv/CaptureRTSPFrame.cpp | Updates write callback to return IoResult. |
| extensions/mqtt/processors/PublishMQTT.cpp | Updates record-set read callback to return IoResult. |
| extensions/mqtt/processors/ConsumeMQTT.h | Updates write callback signature to IoResult. |
| extensions/mqtt/processors/ConsumeMQTT.cpp | Updates write callback implementation to return IoResult. |
| extensions/lua/LuaProcessSession.cpp | Updates Lua session read/write callbacks to return IoResult. |
| extensions/libarchive/tests/MergeFileTests.cpp | Updates serializer reader callbacks to return IoResult. |
| extensions/libarchive/tests/CompressContentTests.cpp | Updates read callback to return IoResult. |
| extensions/libarchive/UnfocusArchiveEntry.h | Updates write callback signature to IoResult. |
| extensions/libarchive/UnfocusArchiveEntry.cpp | Updates write callback implementation to return IoResult. |
| extensions/libarchive/MergeContent.h | Updates merge write callbacks to return IoResult. |
| extensions/libarchive/MergeContent.cpp | Updates serializer reader wrapper to return IoResult. |
| extensions/libarchive/FocusArchiveEntry.h | Updates read callback signature to IoResult. |
| extensions/libarchive/FocusArchiveEntry.cpp | Updates read callback implementation to return IoResult. |
| extensions/libarchive/CompressContent.h | Updates compress callback and nested read callback to return IoResult. |
| extensions/libarchive/CompressContent.cpp | Updates transformer/read/write callback wiring to IoResult. |
| extensions/kafka/PublishKafka.cpp | Updates flowfile read callback to return IoResult. |
| extensions/gcp/processors/PutGCSObject.cpp | Updates upload read callback to return IoResult. |
| extensions/gcp/processors/FetchGCSObject.cpp | Updates download write callback to return IoResult. |
| extensions/couchbase/processors/GetCouchbaseKey.cpp | Updates write callback to return IoResult. |
| extensions/bustache/ApplyTemplate.cpp | Updates write callback to return IoResult. |
| extensions/azure/storage/AzureDataLakeStorage.cpp | Adapts internal::pipe return to optional via expected. |
| extensions/azure/storage/AzureBlobStorage.cpp | Adapts internal::pipe return to optional via expected. |
| extensions/azure/processors/PutAzureDataLakeStorage.h | Updates read callback signature to IoResult. |
| extensions/azure/processors/PutAzureDataLakeStorage.cpp | Updates read callback implementation to return IoResult. |
| extensions/azure/processors/PutAzureBlobStorage.h | Updates inline read callback implementation to return IoResult. |
| extensions/azure/processors/FetchAzureDataLakeStorage.cpp | Updates write callback to return IoResult. |
| extensions/azure/processors/FetchAzureBlobStorage.cpp | Updates write callback to return IoResult. |
| extensions/aws/processors/PutS3Object.cpp | Updates read callback to return IoResult and handles multipart logic. |
| extensions/aws/processors/FetchS3Object.cpp | Updates write callback to return IoResult. |
| extension-framework/src/utils/file/FileWriterCallback.cpp | Updates file writer read callback to return IoResult. |
| extension-framework/src/utils/file/FileReaderCallback.cpp | Updates file reader write callback to return IoResult. |
| extension-framework/src/serialization/PayloadSerializer.cpp | Updates serializer serialize() return type to IoResult. |
| extension-framework/src/serialization/FlowFileV3Serializer.cpp | Updates serializer serialize() return type to IoResult. |
| extension-framework/include/utils/file/FileWriterCallback.h | Updates callback signature to IoResult. |
| extension-framework/include/utils/file/FileReaderCallback.h | Updates callback signature to IoResult. |
| extension-framework/include/serialization/PayloadSerializer.h | Updates serializer interface to return IoResult. |
| extension-framework/include/serialization/FlowFileV3Serializer.h | Updates serializer interface to return IoResult. |
| extension-framework/include/serialization/FlowFileSerializer.h | Updates serializer interface and reader function type to return IoResult. |
| extension-framework/cpp-extension-lib/src/core/ProcessSession.cpp | Converts IoResult back to int64_t for the C API bridge. |
| core-framework/src/utils/LineByLineInputOutputStreamCallback.cpp | Updates read/write callback return type to ReadWriteResult. |
| core-framework/include/utils/LineByLineInputOutputStreamCallback.h | Updates callback signature to ReadWriteResult. |
| core-framework/include/utils/JsonCallback.h | Updates JSON callbacks to return IoResult. |
| core-framework/include/utils/ByteArrayCallback.h | Updates byte-array callback to return IoResult. |
| core-framework/include/io/StreamPipe.h | Changes internal::pipe and related pipes to return IoResult. |
| core-framework/include/http/HTTPCallback.h | Updates HTTP streaming callback processing to return IoResult. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fgerlits
left a comment
There was a problem hiding this comment.
Thank you for doing this! I think it improves readability significantly.
35e35c3 to
a36edb4
Compare
| if (res < ARCHIVE_WARN) { | ||
| logger_->log_warn("FocusArchiveEntry got archive warning while reading header: {}", archive_error_string(input_archive.get())); | ||
| return nlen; | ||
| return io::IoResult::zero(); |
There was a problem hiding this comment.
nlen seems to be updated in this loop or is it always 0 at this point?
| static IoResult fromSizeT(const size_t val) { | ||
| if (isError(val)) { return IoResult::error(); } | ||
| return IoResult(gsl::narrow<uint64_t>(val)); | ||
| } |
There was a problem hiding this comment.
I think we should either clarify how these differ (usage wise) and let that be reflected in the name or drop the type from the name (or if we want to prevent casts we could move to a template and branch with std::is_same_v and friends)
This PR addresses a valid critisim during the review in #2120 (comment)
I've switched to an expected result while keeping the i64 on the C API.
We already abused (and unfortuently will continue to abuse but less so ) the u64 and size_t to hide our errors in them. We used static_cast(-1) to signal errors, but now we have multiple erros, so this PR switches most of the code (but not all) to instead signal the errors in the unexpected value and one could be certain that if the expected has value it is a valid size.
I've implemented the back and forth conversions from the old i64, and hacked u64 to the new std::expected<u64, MinifiIoStatus>
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.